﻿using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using RabbitMQ.Client.Exceptions;
using System;
using System.Diagnostics;

namespace RabbitMQ.Gateway
{
	public static class MessageProcessor
	{
        private static log4net.ILog log = RabbitMQ.Gateway.Log.loger;

    

        /// <summary>
        /// (通过订阅的方式实现)消费
        /// </summary>
        /// <param name="channel"></param>
        /// <param name="queue"></param>
        /// <param name="handler"></param>
		public static void MessageReceiver(IModel channel, string queue, MessageConsumer handler,log4net.ILog loger,Func<object,string,string,bool> ErrorMethod,bool OneByOne = true)
		{

            QueueingBasicConsumer queueingBasicConsumer = null;
            string text = string.Empty;
            try
            {
                queueingBasicConsumer = new QueueingBasicConsumer(channel);

                string[] array = queue.Split(new char[]
                {
                ','
                });
                for (int i = 0; i < array.Length; i++)
                {
                    string text2 = array[i];
                    text = channel.BasicConsume(text2.Trim(), false, queueingBasicConsumer);
                }
            }
            catch (Exception ex)
            {
                log.Error(ex.ToString());
            }
            
            Stopwatch watch = new Stopwatch();
            do
            {
                try
                {
                    BasicDeliverEventArgs basicDeliverEventArgs = queueingBasicConsumer.Queue.Dequeue() as BasicDeliverEventArgs;
                   
                    Message message = new Message
                    {
                        Properties = basicDeliverEventArgs.BasicProperties,
                        Body = basicDeliverEventArgs.Body,
                        RoutingKey = basicDeliverEventArgs.RoutingKey
                    };
                    DeliveryHeader header = new DeliveryHeader
                    {
                        Redelivered = basicDeliverEventArgs.Redelivered,
                        DeliveryTag = basicDeliverEventArgs.DeliveryTag
                    };

                    watch.Reset();
                    watch.Start();

                    if (!Control.IsStopConsumer)
                    {
                        handler(message, channel, header, loger, ErrorMethod);
                    }
                    watch.Stop();

                    if (watch.ElapsedMilliseconds > 10000)
                    {
                        log.WarnFormat("【性能报警】队列{0}的消费者执行的时间超过10秒钟,请检查确定可否优化",queue);
                    }
                }
                catch (OperationInterruptedException ex)
                {
                    watch.Stop();
                    log.Error(ex.ToString());
                }
                catch (Exception ex)
                {
                    watch.Stop();
                    log.ErrorFormat("异常函数执时长={0}毫秒,异常详细信息={1}",watch.ElapsedMilliseconds,ex);
                    OneByOne = true;
                }
            } while (!OneByOne);
		}

    
    }
}
